package com.stack.kafka.sample;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * 生产者
 */
public class ProducerQuickStart {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //1.kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        //发送失败，失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG,5);
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //2.生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

        //封装发送的消息
        ProducerRecord<String,String> record = new ProducerRecord<String, String>("stack-topic","哇哈哈","kafka驾到，统统闪开");

        //3.发送消息
        producer.send(record);

        //4.关闭消息通道，必须关闭，否则消息发送不成功
        producer.close();

//        //1.kafka链接配置信息
//        Properties prop = new Properties();
//        //kafka链接地址
//        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
//        //key和value的序列化
//        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//
//        //ack配置  消息确认机制
//        prop.put(ProducerConfig.ACKS_CONFIG,"all");
//
//        //重试次数
//        prop.put(ProducerConfig.RETRIES_CONFIG,10);
//
//        //数据压缩
//        prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
//
//        //2.创建kafka生产者对象
//        KafkaProducer<String,String> producer = new KafkaProducer<String,String>(prop);
//
//        //3.发送消息
//        /**
//         * 第一个参数 ：topic
//         * 第二个参数：消息的key
//         * 第三个参数：消息的value
//         */
//        for (int i = 0; i < 10; i++) {
//            ProducerRecord<String,String> kvProducerRecord = new ProducerRecord<String,String>("itcast-topic-input", "hello kafka");
//            ProducerRecord<String,String> kvProducerRecord2 = new ProducerRecord<String,String>("stack-topic", 1,"001","hello kafka");
//            producer.send(kvProducerRecord);
//        }
//
//        //同步发送消息
//        /*RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
//        System.out.println(recordMetadata.offset());
//
//        //异步消息发送
//        /*producer.send(kvProducerRecord, new Callback() {
//            @Override
//            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//                if(e != null){
//                    System.out.println("记录异常信息到日志表中");
//                }
//                System.out.println(recordMetadata.offset());
//            }
//        });*/
//
//        //4.关闭消息通道  必须要关闭，否则消息发送不成功
//        producer.close();
//


    }

}
